Conversation
michaeljguarino
left a comment
There was a problem hiding this comment.
this would need graphql exposure of the deployment_settings metrics fields, and also integration into the k8s operator too. You can probably get cursor agent to do it for you tbh.
| @@ -0,0 +1,95 @@ | |||
| defmodule Console.Otel.Exporter do | |||
There was a problem hiding this comment.
Is there no library that handles this at the moment? There might not be, but would be nice to use if so
There was a problem hiding this comment.
I think existing library opentelemetry_exporter is still dev status (not guaranteed to be stable)?
| """ | ||
| @spec maybe_export_metrics() :: :ok | {:error, term} | ||
| def maybe_export_metrics do | ||
| case Settings.fetch() do |
There was a problem hiding this comment.
convert this into a with expression (almost all nested cases can become them)
lib/console/otel/cron.ex
Outdated
| Use this for direct invocation (testing, manual runs). | ||
| """ | ||
| @spec export_metrics() :: :ok | {:error, term} | ||
| def export_metrics do |
lib/console/otel/cron.ex
Outdated
| end | ||
| end | ||
|
|
||
| defp should_run?(crontab) do |
There was a problem hiding this comment.
i don't think this will work w/o some statefulness (you need to keep around the last run date somewhere to compute the next run time, we do that everywhere in the db-bound crons).
A way to simulate that would be to convert this into a genserver, keep the last run time in genserver state, and only run the metrics export if the node is a determined leader (use the Console.ClusterRing module to determine that)
lib/console/otel/metrics_exporter.ex
Outdated
| [] | ||
| |> Stream.concat(build_service_metrics(timestamp)) | ||
| |> Stream.concat(build_cluster_metrics(timestamp)) | ||
| |> Enum.to_list() |
There was a problem hiding this comment.
doing an enum.to_list eliminates the whole purpose of the stream (maintaining O(1) memory usage). What you should do here is instead pipe the concat'ed stream into Stream.chunk_every(100 or some other chunk size, and then pipe that to the exporter.
lib/console/otel/metrics_exporter.ex
Outdated
| |> Stream.flat_map(&build_cluster_metrics_list(&1, timestamp)) | ||
| end | ||
|
|
||
| defp build_service_metric(%Service{} = service, timestamp) do |
There was a problem hiding this comment.
mostly a style nit, but i'd consider moving this metric formatting code to another module
lib/console/otel/metrics_exporter.ex
Outdated
| |> DateTime.to_naive() | ||
| |> NaiveDateTime.add(60, :second) | ||
|
|
||
| next = Crontab.Scheduler.get_next_run_date!(expr, base_time) |
There was a problem hiding this comment.
i'd generally make these a with expression with {:ok, _} tuple matches down the line so you don't needlessly crash the genserver on bad input
michaeljguarino
left a comment
There was a problem hiding this comment.
looks mostly fine, minus a few nits
lib/console/otel/metrics_exporter.ex
Outdated
| defp do_export(endpoint) do | ||
| timestamp = DateTime.utc_now() | ||
|
|
||
| Repo.transaction(fn -> |
There was a problem hiding this comment.
don't worry about the transaction here, it's read-only so no need to roll back
lib/console/otel/metrics_exporter.ex
Outdated
| count | ||
| end | ||
| end) | ||
| |> case do |
There was a problem hiding this comment.
can probably just make this |> then(&Logger.info("Exported #{&1} metrics to #{enpoint}"). It's not hard to no 0 metrics implies it didn't export
There was a problem hiding this comment.
one thing that still isn't here is the kubernetes CRD definition for setting these fields. You need to:
- Expose the inputs in gql so the client can modify them
- Modify the crd type
- Modify the controller to wire it up
Oftentimes i can just get ai to do this for me now, but you probably need to familiarize yourself enough with the structure to prompt it well
85dfc18 to
572fa02
Compare
Greptile SummaryThis PR adds an OpenTelemetry metrics export pipeline: a new
Confidence Score: 4/5Safe to merge after addressing the silent transaction failure in do_export/1. One P1 defect: DB transaction errors in do_export are silently swallowed, last_run_at is updated regardless, and no error is logged — making export failures invisible. The remaining findings are P2 (unused struct field, dead Stream.reject, minor leader-check race). The core pipeline logic, schema validation, migration, GraphQL types, and Go CRD are all well-structured and well-tested. lib/console/otel/metrics_exporter.ex — transaction error handling and check_ref cleanup
|
| Filename | Overview |
|---|---|
| lib/console/otel/metrics_exporter.ex | New GenServer that schedules and drives OTel metric exports; has a silent transaction failure bug and a minor leader-check race in the :export handler. |
| lib/console/otel/exporter.ex | HTTP OTLP/JSON exporter using Req; cleanly formats metrics and handles errors. |
| lib/console/otel/metrics_builder.ex | Pure functions building OTel metric maps from Cluster/Service records; has a harmless dead Stream.reject call. |
| lib/console/schema/deployment_settings.ex | Adds embedded Metrics schema with enabled/endpoint/crontab fields and proper validation (required-if-enabled + cron syntax check). |
| priv/repo/migrations/20250429000000_add_metrics_export_settings.exs | Adds :map column for metrics embedded schema to deployment_settings table. |
| go/controller/api/v1alpha1/deploymentsettings_types.go | Adds MetricsSettings CRD type with kubebuilder markers and an Attributes() conversion method wired into the controller. |
| lib/console/graphql/deployments/settings.ex | Adds MetricsSettings input/output GraphQL types and wires them into the DeploymentSettings object and mutation input. |
| lib/console/application.ex | Registers Console.Otel.MetricsExporter as a supervised child process. |
Reviews (1): Last reviewed commit: "add otel exports" | Re-trigger Greptile
| defp do_export(endpoint) do | ||
| timestamp = DateTime.utc_now() | ||
|
|
||
| # Transaction is required for Repo.stream cursor support | ||
| Repo.transaction(fn -> | ||
| [] | ||
| |> Stream.concat(MetricsBuilder.service_metrics_stream(timestamp)) | ||
| |> Stream.concat(MetricsBuilder.cluster_metrics_stream(timestamp)) | ||
| |> Stream.chunk_every(@chunk_size) | ||
| |> Enum.reduce(0, fn chunk, count -> | ||
| case Exporter.export(endpoint, chunk) do | ||
| :ok -> | ||
| count + length(chunk) | ||
|
|
||
| {:error, reason} -> | ||
| Logger.error("Failed to export chunk: #{inspect(reason)}") | ||
| count | ||
| end | ||
| end) | ||
| |> then(&Logger.info("Exported #{&1} metrics to #{endpoint}")) | ||
| end) | ||
|
|
||
| :ok | ||
| end |
There was a problem hiding this comment.
Transaction errors are silently swallowed
Repo.transaction/1 returns {:ok, result} or {:error, reason}, but the return value is discarded and do_export/1 unconditionally returns :ok. If the database transaction fails (connection error, lock timeout, etc.), there is no error log and the caller in handle_info(:export) still updates last_run_at as if the export succeeded — masking the failure entirely.
defp do_export(endpoint) do
timestamp = DateTime.utc_now()
result = Repo.transaction(fn ->
[]
|> Stream.concat(MetricsBuilder.service_metrics_stream(timestamp))
|> Stream.concat(MetricsBuilder.cluster_metrics_stream(timestamp))
|> Stream.chunk_every(@chunk_size)
|> Enum.reduce(0, fn chunk, count ->
case Exporter.export(endpoint, chunk) do
:ok -> count + length(chunk)
{:error, reason} ->
Logger.error("Failed to export chunk: #{inspect(reason)}")
count
end
end)
|> then(&Logger.info("Exported #{&1} metrics to #{endpoint}"))
end)
case result do
{:ok, _} -> :ok
{:error, reason} ->
Logger.error("Metrics export transaction failed: #{inspect(reason)}")
{:error, reason}
end
endAnd in handle_info(:export), only update last_run_at on :ok to allow a retry on the next schedule cycle.
| defmodule State do | ||
| defstruct [:last_run_at, :timer_ref, :check_ref] | ||
| end |
There was a problem hiding this comment.
check_ref field is never populated
State declares a check_ref field but :timer.send_interval/2 is called in init/1 without capturing its return value, so check_ref is always nil. The field is unused throughout the module and can be removed to avoid confusion.
| defmodule State do | |
| defstruct [:last_run_at, :timer_ref, :check_ref] | |
| end | |
| defstruct [:last_run_at, :timer_ref] |
| def handle_info(:export, state) do | ||
| case get_config() do | ||
| {:ok, %{endpoint: endpoint}} -> | ||
| do_export(endpoint) | ||
| {:noreply, %{state | last_run_at: DateTime.utc_now(), timer_ref: nil}} | ||
|
|
||
| _ -> | ||
| {:noreply, %{state | timer_ref: nil}} | ||
| end | ||
| end |
There was a problem hiding this comment.
Leader check skipped in
:export handler
handle_info(:export, state) only checks whether a config is present — it does not verify that this node is still the leader. If :export is already in the process mailbox when handle_info(:check) cancels the timer due to a leadership change (the timer has already fired so Process.cancel_timer returns 0 but the message is still queued), the export will run on the non-leader node, producing duplicate metric data points.
Consider adding a leader guard:
def handle_info(:export, state) do
case {leader?(), get_config()} do
{true, {:ok, %{endpoint: endpoint}}} ->
do_export(endpoint)
{:noreply, %{state | last_run_at: DateTime.utc_now(), timer_ref: nil}}
_ ->
{:noreply, %{state | timer_ref: nil}}
end
end
lib/console/otel/metrics_builder.ex
Outdated
| |> Repo.stream(method: :keyset) | ||
| |> Console.throttle(count: 500, pause: 50) | ||
| |> Stream.map(&build_service_metric(&1, timestamp)) | ||
| |> Stream.reject(&is_nil/1) |
There was a problem hiding this comment.
572fa02 to
1aa8190
Compare
Add OTel exports
Test Plan
Unit tests
Checklist
Plural Flow: console